#include "config.h"

#define DBG_SUBSYS S_LIBCONTROL

#include "lich_api.h"
#include "lich_qos.h"

#include "volume_ctl_internal.h"
#include "volume_bh.h"
#include "ramdisk.h"
#include "dbg.h"
#include "lsv_volume_gc.h"

typedef struct {
        struct list_head hook;
        nid_t nid;
        time_t ltime;
        int count;
        uint32_t addrs[VOLUME_MAX_CONNECTION];
} connect_srv_t;

typedef struct {
        struct list_head hook;
        nid_t nid;
} vfm_entry_t;



// -- decls
#ifdef USE_ROW2
static int __try_load_lsv(volume_proto_t *volume_proto);
//static void __volume_proto_destroy(volume_proto_t *volume_proto);
#endif

STATIC int __volume_proto_setattr(volume_proto_t *volume_proto, fileinfo_t *_fileinfo,
                                  const setattr_t *setattr)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->setattr(table1, _fileinfo, setattr);
}

STATIC int __volume_proto_getattr(volume_proto_t *volume_proto, fileinfo_t *fileinfo)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->getattr(table1, fileinfo);
}

STATIC int __volume_proto_chunk_getinfo(volume_proto_t *volume_proto, const chkid_t *chkid,
                                      chkinfo_t *_chkinfo)
{
        int ret;
        table1_t *table1;
        table2_t *table2;

        if (chkid->type == __RAW_CHUNK__) {
                table2 = &volume_proto->table2;
                ret = table2->chunk_getinfo(table2, chkid, _chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                table1 = &volume_proto->table1;
                ret = table1->chunk_getinfo(table1, chkid, _chkinfo);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        /*CHKINFO_DUMP(_chkinfo, D_INFO);*/

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_set(volume_proto_t *volume_proto, const chkid_t *chkid,
                                  const nid_t *nid, int status)
{
        int ret;
        table1_t *table1;
        table2_t *table2;

        if (chkid->type == __RAW_CHUNK__) {
                table2 = &volume_proto->table2;
                ret = table2->chunk_set(table2, chkid, nid, status, volume_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                table1 = &volume_proto->table1;
                ret = table1->chunk_set(table1, chkid, nid, status, volume_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_check(volume_proto_t *volume_proto, const chkid_t *chkid, int *oflags)
{
        int ret;

        DBUG("chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        ret = volume_proto_recovery_enter(volume_proto, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = volume_proto_chunk_check(volume_proto, chkid, __OP_WRITE, oflags);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        goto err_ret;
                } else
                        GOTO(err_ret, ret);
        }

        volume_proto_recovery_exit(volume_proto, chkid);
        return 0;
err_ret:
        volume_proto_recovery_exit(volume_proto, chkid);
        return ret;
}

/**
 * 每次操作前，都需检测是否需要重新加载：lease
 */
STATIC int __volume_proto_needreload(volume_proto_t *volume_proto)
{
        int ret;
        time_t ltime;

        if (unlikely(volume_proto->ltime == 0 || volume_proto->table1.ltime == 0
            || volume_proto->table2.ltime == 0)) {
                DINFO("table %p "CHKID_FORMAT" need reload, %u, %u, %u\n",
                      volume_proto, CHKID_ARG(&volume_proto->chkid), (int)volume_proto->ltime,
                      (int)volume_proto->table1.ltime, (int)volume_proto->table2.ltime);
                return 1;
        }

        ret = network_connect(&volume_proto->table1.parentnid, &ltime, 1, 0);
        if (unlikely(ret || (ltime != volume_proto->ltime))) {
                DINFO("table %p "CHKID_FORMAT" need check, ret %u ltime %u %u, parent @ %s\n",
                      volume_proto, CHKID_ARG(&volume_proto->chkid), ret, (int)ltime,
                      (int)volume_proto->ltime,
                      network_rname(&volume_proto->table1.parentnid));
                return 1;
        }



        return 0;
}

STATIC int __volume_proto_connect(volume_proto_t *volume_proto,
                                  const fileid_t *parent, const chkid_t *chkid)
{
        int ret;
        table1_t *table1;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];

        table1 = &volume_proto->table1;
        if (chkid_isroot(parent)) {
                table1->parentnid = *net_getadmin();
        } else {
                ret = md_map_getsrv(parent, &table1->parentnid);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        YASSERT(!net_isnull(&table1->parentnid));
        ret = network_connect(&table1->parentnid, &volume_proto->ltime, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo1(table1->pool, parent, chkid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table1->ltime = volume_proto->ltime;
        volume_proto->table2.ltime = volume_proto->ltime;
        DINFO("table %p "CHKID_FORMAT" ltime, %u, %u, %u\n",
                        volume_proto, CHKID_ARG(&volume_proto->chkid), (int)volume_proto->ltime,
                        (int)volume_proto->table1.ltime, (int)volume_proto->table2.ltime);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_cleanup(volume_proto_t *volume_proto)
{
        int ret;

        ret = table2_cleanup(&volume_proto->table2);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = table1_cleanup(&volume_proto->table1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_suicide(volume_proto_t *volume_proto)
{
        int ret;

        ret = table1_cleanup(&volume_proto->table1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}


STATIC int __volume_proto_cleanup_bh1(volume_proto_t *volume_proto, const chkid_t *chkid)
{
        return volume_proto_discard(volume_proto, chkid);
}

STATIC int __volume_proto_cleanup_bh2(volume_proto_t *volume_proto)
{
        return volume_proto->cleanup(volume_proto);
}


STATIC int __volume_proto_xattr_set(volume_proto_t *volume_proto, const char *key, const char *value,
                        uint32_t valuelen, int flag)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->xattr_set(table1, key, value, valuelen, flag);
}

STATIC int __volume_proto_xattr_get(volume_proto_t *volume_proto, const char *key, char *value,
                                 int *valuelen)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->xattr_get(table1, key, value, valuelen);
}

STATIC int __volume_proto_xattr_list(volume_proto_t *volume_proto, char *buf, int *buflen)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->xattr_list(table1, buf, buflen);
}

STATIC int __volume_proto_xattr_remove(volume_proto_t *volume_proto, const char *key)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->xattr_remove(table1, key);
}

STATIC int __volume_proto_chunk_sync(volume_proto_t *volume_proto, const chkid_t *chkid, int *oflags)
{
        int ret;
        table1_t *table1;
        table2_t *table2;

        // SCHEDULE_LEASE_SET();

        ret = volume_proto_recovery_enter(volume_proto, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        SCHEDULE_LEASE_SET();

        if (chkid->type == __RAW_CHUNK__) {
                table2 = &volume_proto->table2;
                ret = table2->chunk_sync(table2, chkid, oflags);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                table1 = &volume_proto->table1;
                ret = table1->chunk_sync(table1, chkid, volume_proto, oflags);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        volume_proto_recovery_exit(volume_proto, chkid);
        return 0;
err_ret:
        volume_proto_recovery_exit(volume_proto, chkid);
        return ret;
}

STATIC int __volume_proto_chunk_cleanup(volume_proto_t *volume_proto, const chkid_t *chkid,
                                      const nid_t *nid, uint64_t meta_version)
{
        int ret;
        table1_t *table1;
        table2_t *table2;

        if (chkid->type == __RAW_CHUNK__) {
                table2 = &volume_proto->table2;
                ret = table2->chunk_cleanup(table2, chkid, nid, meta_version, volume_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                table1 = &volume_proto->table1;
                ret = table1->chunk_cleanup(table1, chkid, nid, meta_version, volume_proto);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_move(volume_proto_t *volume_proto, const nid_t *nid, int count)
{
        int ret;
        table1_t *table1;

        if (count < LICH_REPLICA_MIN && volume_proto->table1.fileinfo.repnum_usr != 1 && !cluster_is_solomode()) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        table1 = &volume_proto->table1;
        ret = table1->move(table1, &volume_proto->chkid, nid, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!net_islocal(&nid[0])) {
                DWARN("reset table "CHKID_FORMAT" nid %s\n",
                      CHKID_ARG(&table1->chkid),
                      network_rname(&nid[0]));

                volume_proto->ltime = 0;
                volume_proto->table1.ltime = 0;
                volume_proto->table2.ltime = 0;
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_move(volume_proto_t *volume_proto, const chkid_t *chkid, const nid_t *nid, int count)
{
        int ret;
        table1_t *table1;
        table2_t *table2;

        if (count < LICH_REPLICA_MIN && volume_proto->table1.fileinfo.repnum_usr != 1 && !cluster_is_solomode()) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        if (chkid->type == __VOLUME_CHUNK__ || chkid->type == __VOLUME_SUB_CHUNK__) {
                table1 = &volume_proto->table1;
                ret = table1->move(table1, chkid, nid, count);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                if (!net_islocal(&nid[0])) {
                        DWARN("reset table "CHKID_FORMAT" nid %s\n",
                              CHKID_ARG(&table1->chkid),
                              network_rname(&nid[0]));

                        volume_proto->ltime = 0;
                        volume_proto->table1.ltime = 0;
                        volume_proto->table2.ltime = 0;
                }
        } else if (chkid->type == __RAW_CHUNK__) {
                table2 = &volume_proto->table2;
                ret = table2->chunk_move(table2, chkid, nid, count, volume_proto);
                if (unlikely(ret)) {
                        if (ret == EEXIST) {
                                goto out;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                YASSERT(0);
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_localize(volume_proto_t *volume_proto, const chkid_t *chkid)
{
        int ret;
        table2_t *table2;

        table2 = &volume_proto->table2;
        ret = table2->chunk_localize(table2, chkid, volume_proto);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        goto out;
                } else
                        GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_stat(volume_proto_t *volume_proto, filestat_t *filestat, off_t off, size_t size)
{
        int ret, len;
        table2_t *table2;
        table1_t *table1;
        uint32_t chknum;
        const fileinfo_t *fileinfo;
        const uint64_t *snap_rollback;

        table1 = &volume_proto->table1;
        fileinfo = &table1->fileinfo;
        chknum = size2chknum(table1->fileinfo.size, &volume_proto->table1.fileinfo.ec);
        snap_rollback = (fileinfo->snap_version == fileinfo->snap_rollback)
                                 ? NULL : &fileinfo->snap_rollback;

        if (off == 0 && size == 0) {
                ret = table1->stat(table1, filestat);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        } else {
                table2 = &volume_proto->table2;
                ret = table2->chunk_stat(table2, &volume_proto->chkid, snap_rollback,
                                         filestat, off, size);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        // TODO UNIMPLEMENTED(__WARN__);
        filestat->metadata = table1->table_count;
        filestat->size = fileinfo->size;
#if LSV
        filestat->logical_size = fileinfo->logical_size;
        filestat->volume_page_id = fileinfo->volume_page_id;
        filestat->gc_os_page_id = fileinfo->gc_os_page_id;
        filestat->gc_bitmap_page_id = fileinfo->gc_bitmap_page_id;
        filestat->rcache_page_id = fileinfo->rcache_page_id;
        filestat->wbuf_page_id = fileinfo->wbuf_page_id;
        filestat->bitmap_chunk_id = fileinfo->bitmap_chunk_id;
        filestat->source = fileinfo->source;
        strcpy(filestat->snap, fileinfo->snap);
#endif

        filestat->chknum = chknum;
        filestat->snap_version = fileinfo->snap_version;
        filestat->snap_rollback = fileinfo->snap_rollback;
        filestat->snap_from = fileinfo->snap_from;
        filestat->attr = fileinfo->attr;

        if (off == 0) {
                DINFO("get vol info "CHKID_FORMAT" begin\n",
                      CHKID_ARG(&volume_proto->chkid));
        }

        if (off + size == fileinfo->size) {
                DINFO("get vol info "CHKID_FORMAT" end\n",
                      CHKID_ARG(&volume_proto->chkid));
        }

        if (fileinfo->attr & __FILE_ATTR_CLONE__) {
                len = MAX_NAME_LEN;
                ret = table1->xattr_get(table1, LICH_SYSTEM_ATTR_SOURCE, filestat->src, &len);
                if (ret)
                        GOTO(err_ret, ret);
        }

        if (snap_rollback) {
                volume_bh_add(&fileinfo->id, BH_SNAP_ROLLBACK, 0);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_iscsi_connection(volume_proto_t *volume_proto, const nid_t *peer, void *list, int *_count)
{
        int ret, count;
        connect_srv_t *tmp;
        struct list_head *pos, *n;
        void *buf;
        uint32_t buflen;

        ret = plock_rdlock(&volume_proto->connect.rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) peer;
        count = 0;
        buf = list;
        list_for_each_safe(pos, n, &volume_proto->connect.connect_list.list) {
                tmp = (void *)pos;

                buf = _opaque_encode(buf, &buflen, &tmp->nid, sizeof(tmp->nid),
                                tmp->addrs, sizeof(*tmp->addrs) * tmp->count, NULL);
                count += buflen;
        }

        plock_unlock(&volume_proto->connect.rwlock);

        //DINFO("chkid "CHKID_FORMAT" conn %d\n",
        //      CHKID_ARG(&volume_proto->chkid), volume_proto->connect.connect_list.count);

        YASSERT(buf - list == count);
        *_count = count;

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_iscsi_connect_create(volume_proto_t *volume_proto, const nid_t *peer, uint32_t addr)
{
        int ret;
        time_t ltime;
        connect_srv_t *connect;

        ret = network_ltime(peer, &ltime);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&connect, sizeof(*connect));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(connect, 0x0, sizeof(connect_srv_t));

        INIT_LIST_HEAD(&connect->hook);
        connect->nid = *peer;
        connect->ltime = ltime;
        connect->count = 1;
        connect->addrs[0] = addr;

        count_list_add(&connect->hook, &volume_proto->connect.connect_list);

        DINFO("%p vol "CHKID_FORMAT" from peer %s/%d/%s ltime %jd conn %d count %d\n",
              volume_proto,
              CHKID_ARG(&volume_proto->chkid),
              network_rname(peer),
              connect->nid.id,
              _inet_ntoa(addr),
              connect->ltime,
              volume_proto->connect.connect_list.count,
              connect->count);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_iscsi_connect(volume_proto_t *volume_proto, const nid_t *peer, const char *addr)
{
        int ret, count = 0, i, found_target = 0, found = 0;
        struct list_head *pos, *n;
        connect_srv_t *connect;

        in_addr_t new_addr = inet_addr(addr);

        ret = plock_wrlock(&volume_proto->connect.rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &volume_proto->connect.connect_list.list) {
                connect = (void *) pos;

                if (nid_cmp(&connect->nid, peer) == 0) {
                        found_target = 1;

                        for (i=0; i < connect->count; i++) {
                                if (connect->addrs[i] == new_addr) {
                                        found = 1;
                                        break;
                                }
                        }

                        if (!found) {
                                if (connect->count >= VOLUME_MAX_CONNECTION) {
                                        ret = EMLINK;
                                        DERROR("maximum connection is %d, the present connection is %d, you can update it in '/opt/fusionstack/etc/lich.conf'\n",
                                               VOLUME_MAX_CONNECTION, count);
                                        GOTO(err_lock, ret);
                                }

                                DINFO("%p vol "CHKID_FORMAT" from peer %s/%d/%s conn %d count %d\n",
                                      volume_proto,
                                      CHKID_ARG(&volume_proto->chkid),
                                      network_rname(peer),
                                      peer->id,
                                      addr,
                                      volume_proto->connect.connect_list.count,
                                      connect->count);

                                connect->addrs[connect->count++] = new_addr;
                        }
                }
        }

        if (!found_target) {
                ret = __volume_proto_iscsi_connect_create(volume_proto, peer, new_addr);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        plock_unlock(&volume_proto->connect.rwlock);

        return 0;
err_lock:
        plock_unlock(&volume_proto->connect.rwlock);
err_ret:
        return ret;
}

STATIC int __volume_proto_iscsi_disconnect(volume_proto_t *volume_proto, const nid_t *peer, const char *addr)
{
        int ret, i, found = 0;
        struct list_head *pos, *n;
        connect_srv_t *connect = NULL;

        in_addr_t new_addr = inet_addr(addr);

        ret = plock_wrlock(&volume_proto->connect.rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        list_for_each_safe(pos, n, &volume_proto->connect.connect_list.list) {
                connect = (void *) pos;

                if (nid_cmp(&connect->nid, peer) == 0) {
                        for (i=0; i < connect->count; i++) {
                                if (found) {
                                        connect->addrs[i-1] = connect->addrs[i];
                                } else if (connect->addrs[i] == new_addr) {
                                        found = 1;
                                }
                        }

                        if (found) {
                                connect->count--;
                        }

                        if (connect->count == 0) {
                                count_list_del_init(pos, &volume_proto->connect.connect_list);
                                yfree((void **)&connect);
                        }
                }
        }

        plock_unlock(&volume_proto->connect.rwlock);

        if (found) {
                DINFO("%p vol "CHKID_FORMAT" from peer %s/%d/%s conn %d count %d\n",
                      volume_proto,
                      CHKID_ARG(&volume_proto->chkid),
                      network_rname(peer),
                      peer->id,
                      addr,
                      volume_proto->connect.connect_list.count,
                      (connect == NULL ? 0 : connect->count));
        } else {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_allocate(volume_proto_t *volume_proto,
                const chkid_t *chkid, int chknum, int fill)
{
        int ret, i, retry = 0, localize;
        uint32_t idx_max;
        table2_t *table2;
        io_opt_t io_opt;

        if (volume_proto->table1.fileinfo.attr & __FILE_ATTR_CLONE__
            || volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__) {
                ret = ENOSYS;
                GOTO(err_ret, ret);
        }

        YASSERT(chkid->type == __RAW_CHUNK__);

        idx_max = 0;
        for (i = 0; i < chknum; i++) {
                if (chkid[i].idx > idx_max) {
                        idx_max = chkid[i].idx;
                }
        }

        ret = volume_proto->chunk_extend(volume_proto, idx_max);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table2 = &volume_proto->table2;

#if 1
        (void) retry;

        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        io_opt_init(&io_opt, localize, 0, fill, 0);

        ANALYSIS_BEGIN(1);

        ret = table2->chunk_create(table2, chkid, chknum, &io_opt, volume_proto);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        DINFO("chunk "CHKID_FORMAT" exist\n", CHKID_ARG(chkid));
                        //pass
                } else
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(1, IO_WARN, "volume_proto_chunk_allocate1");

#else
retry:
        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        ret = table2->chunk_check(table2, chkid, __OP_WRITE, localize, NULL, volume_proto);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        YASSERT(retry == 0);
                        retry = 1;
                        io_opt_init(&io_opt, localize, 0, fill, 0);
                        ret = table2->chunk_create(table2, chkid, 1, &io_opt, volume_proto);
                        if (unlikely(ret)) {
                                if (ret == EEXIST) {
                                        DINFO("chunk "CHKID_FORMAT" exist\n",
                                              CHKID_ARG(chkid));
                                        goto retry;
                                } else
                                        GOTO(err_ret, ret);
                        }

                        goto retry;
                } else
                        GOTO(err_ret, ret);
        }
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_iterator1(volume_proto_t *volume_proto, func2_t func2, void *_arg)
{
        int ret;
        table1_t *table1;

        table1 = &volume_proto->table1;

        ret = table1->chunk_iterator(table1, func2, _arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        DERROR("snapshot_iterator disabled\n");
#else
        ret = table1->snapshot_iterator(table1, func2, _arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_iterator2(volume_proto_t *volume_proto, func2_t func2, void *_arg, uint64_t idx)
{
        int ret;
        table2_t *table2;

        table2 = &volume_proto->table2;

        ret = table2->chunk_iterator(table2, &volume_proto->chkid, func2, _arg, idx);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_ret;
                else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_unintact1(volume_proto_t *volume_proto, func3_t func3, void *_arg)
{
        int ret;
        table1_t *table1;

        table1 = &volume_proto->table1;

        ret = table1->chunk_unintact(table1, func3, _arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        DERROR("snapshot_unintact disabled\n");
#else
        ret = table1->snapshot_unintact(table1, func3, _arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_unintact2(volume_proto_t *volume_proto, func3_t func3, void *_arg, uint64_t idx, int deep)
{
        int ret;
        table2_t *table2;
        chkid_t tid, chkid;
        table1_t *table1 = &volume_proto->table1;
        table2 = &volume_proto->table2;

        fid2cid(&chkid, &volume_proto->chkid, idx);
        cid2tid(&tid, &chkid);

        ret = table1->chunk_check(table1, &tid, __OP_WRITE, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        ret = table2->chunk_unintact(table2, &volume_proto->chkid, func3, _arg, idx, deep);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_ret;
                else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_update(volume_proto_t *volume_proto,
                                       const chkinfo_t *chkinfo, const nid_t *owner, uint64_t info_version)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->chunk_update(table1, chkinfo, owner, info_version);
}

static void __iscsi_connection_destroy(count_list_t *head)
{
        struct list_head *pos, *n;
        connect_srv_t *connect;

        list_for_each_safe(pos, n, &head->list) {
                connect = list_entry(pos, connect_srv_t, hook);
                head->count--;
                list_del_init(&connect->hook);
                yfree((void **)&connect);
        }

        YASSERT(head->count == 0);
}

#if ECLOG_ENABLE
static void __volume_proto_eclog_destroy(volume_proto_t *volume_proto)
{
        uint32_t i, retry = 0;
        ec_t *ec;

        if (!volume_proto->eclog)
                return;

        ec = &volume_proto->table1.fileinfo.ec;
        if (!EC_ISEC(ec))
                return;

        while (volume_proto->destroy != 2) {
                schedule_sleep("eclog_destroy", 1000 * 1000);
                retry++;
                YASSERT(retry < 10);
        }

        for (i = 0; i < ECLOG_LOG_MAX(ec); i++) {
                if (volume_proto->eclog[i])
                        yfree((void **)&volume_proto->eclog[i]);
        }

        yfree((void **)&volume_proto->eclog);
}
#endif

/**
 * @todo 会被调用多次，包括cleanup的时候
 *
 * volume_proto_destroy
 * __volume_proto_cleanup
 * __volume_proto_reload
 *
 * @param volume_proto
 */
void volume_proto_destroy(volume_proto_t *volume_proto)
{
        YASSERT(volume_proto);

        DBUG("volume %p "CHKID_FORMAT" destroy\n", volume_proto, CHKID_ARG(&volume_proto->chkid));
        volume_proto->destroy = 1;

#ifdef USE_ROW2
        //must put it in the front of destroy.
        volume_proto_post_destroy(volume_proto);
#endif

        volume_proto_analysis_destroy(volume_proto);

        __iscsi_connection_destroy(&volume_proto->connect.connect_list);
#if ECLOG_ENABLE
        __volume_proto_eclog_destroy(volume_proto);
#endif

        DBUG("table1_destory volume %p %s\n", volume_proto, id2str(&volume_proto->chkid));
        table1_destroy(&volume_proto->table1);

        DBUG("table2_destory volume %p %s\n", volume_proto, id2str(&volume_proto->chkid));
        table2_destroy(&volume_proto->table2);

        DBUG("reset table %p "CHKID_FORMAT"\n", volume_proto, CHKID_ARG(&volume_proto->chkid));
        volume_proto->ltime = 0;

        rollback_context_destroy(volume_proto);
        yfree((void **)&volume_proto);
}

int volume_proto_chunk_check(volume_proto_t *volume_proto, const chkid_t *chkid, int op, int *oflags)
{
        int ret, localize;
        table1_t *table1;
        table2_t *table2;

        ret = volume_proto->chunk_extend(volume_proto, chkid->idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (chkid->type == __RAW_CHUNK__) {
                table2 = &volume_proto->table2;
                localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
                ret = table2->chunk_check(table2, chkid, op, localize, NULL, oflags);
                if (unlikely(ret)) {
                        if (ret == ENOENT)
                                goto err_ret;
                        else
                                GOTO(err_ret, ret);
                }
        } else {
                table1 = &volume_proto->table1;
                ret = table1->chunk_check(table1, chkid, op, oflags);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

STATIC int IO_FUNC __volume_proto_chunk_pre_write(volume_proto_t *volume_proto, io_opt_t *io_opt, const chunk_io_t *chunk_io)
{
        int ret, retry = 0, localize;
        table2_t *table2;
        uint64_t snap_version;
        const chkid_t *chkid = &chunk_io->io.id;

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __RAW_CHUNK__);

        table2 = &volume_proto->table2;
retry:
        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        ret = table2->chunk_check(table2, chkid, __OP_WRITE, localize, &snap_version, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        if (io_opt->flag & IO_OPT_CREAT) {
                                if (unlikely((chkid->idx + 1) * LICH_CHUNK_SPLIT > VOLUME_SIZE_MAX))  {
                                        DERROR("%s too big, volume size max: %lld byte \n", id2str(chkid), VOLUME_SIZE_MAX);
                                        ret = EFBIG;
                                        GOTO(err_ret, ret);
                                }

                                // TODO core: chunk check返回ENOENT，而chunk_create返回EEXIST
                                // 如果故障点发生在更新元数据的中间(包括三步：disk bitmap, sqlite，meta）
                                // 会造成不同的情况
                                // chunk check基于meta信息做出判断，而chunk_create基于sqlite信息做判断
                                if (retry) {
                                        DFATAL("chunk %s retry %d\n", id2str(chkid), retry);
                                }

                                YASSERT(retry == 0);
                                retry = 1;
                                io_opt->localize = localize;
                                ret = table2->chunk_create(table2, chkid, 1, io_opt, volume_proto);
                                if (unlikely(ret)) {
                                        if (ret == EEXIST) {
                                                DFATAL("chunk %s exist\n", id2str(chkid));
                                                goto retry;
                                        } else
                                                GOTO(err_ret, ret);
                                }

                                goto retry;
                        } else {
                                GOTO(err_ret, ret);
                        }
                } else
                        GOTO(err_ret, ret);
        }

        ret = volume_proto_snapshot_check(volume_proto, chkid, snap_version, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_chunk_pre_write");

        return 0;
err_ret:
        return ret;
}

int IO_FUNC volume_proto_chunk_pre_write(volume_proto_t *volume_proto, io_opt_t *io_opt, chunk_io_t *chunk_io)
{
        int ret;
        table2_t *table2;
        const chkid_t *chkid = &chunk_io->io.id;
        chkinfo_t *chkinfo = chunk_io->chkinfo;
        chkstat_t *chkstat = chunk_io->chkstat;
        vfm_t *vfm = chunk_io->vfm;
        vclock_t *vclock = &chunk_io->io.vclock;

        ret = __volume_proto_chunk_pre_write(volume_proto, io_opt, chunk_io);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table2 = &volume_proto->table2;
        ret = table2->pre_io(table2, chkid, chkinfo, chkstat, vfm, &vclock->clock, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vclock->vfm = vfm->clock;

        return 0;
err_ret:
        return ret;
}


int volume_proto_chunk_reset(volume_proto_t *volume_proto, const chkid_t *chkid)
{
        int ret;
        table2_t *table2;

        YASSERT(chkid->type == __RAW_CHUNK__);

        table2 = &volume_proto->table2;

        // reset ltime
        ret = table2->reset(table2, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int volume_proto_chunk_post_io(volume_proto_t *volume_proto, const chkid_t *chkid,
                             const chkinfo_t *chkinfo, const chkstat_t *chkstat, uint64_t clock)
{
        int ret, localize;
        table2_t *table2;
        uint64_t snap_version;

        ANALYSIS_BEGIN(0);

        YASSERT(chkid->type == __RAW_CHUNK__);

        table2 = &volume_proto->table2;
        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        ret = table2->chunk_check(table2, chkid, __OP_WRITE, localize,
                                  &snap_version, NULL);
        if (unlikely(ret)) {
                DBUG(CHKID_FORMAT" postio check fail ret:%d\n", CHKID_ARG(chkid), ret);
                YASSERT(ret != ENOENT);
                GOTO(err_ret, ret);
        }

        ret = table2->post_io(table2, chkid, chkinfo, chkstat, clock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_chunk_post_io");

        return 0;
err_ret:
        return ret;
}

int IO_FUNC __volume_proto_chunk_pre_read(volume_proto_t *volume_proto, chunk_io_t *chunk_io)
{
        int ret, localize;
        table2_t *table2;
        io_t *io = &chunk_io->io;
        vfm_t *vfm = chunk_io->vfm;
        uint64_t snap_version;

        YASSERT(vfm);
        
        ANALYSIS_BEGIN(0);

        YASSERT(io->id.type == __RAW_CHUNK__);

        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        table2 = &volume_proto->table2;
        ret = table2->chunk_check(table2, &io->id, __OP_READ, localize, &snap_version, NULL);
        if (unlikely(ret)) {
                if (ret == ENOENT)
                        goto err_ret;
                else
                        GOTO(err_ret, ret);
        }

        volume_proto->rollback_ctx.stat.allocate++;

        ret = volume_proto_snapshot_check(volume_proto, &io->id, snap_version, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_chunk_pre_read");
        
        return 0;
err_ret:
        return ret;
}

int IO_FUNC volume_proto_chunk_pre_read(volume_proto_t *volume_proto, chunk_io_t *chunk_io)
{
        int ret;
        table2_t *table2;
        chkinfo_t *chkinfo = chunk_io->chkinfo;
        chkstat_t *chkstat = chunk_io->chkstat;
        io_t *io = &chunk_io->io;
        vfm_t *vfm = chunk_io->vfm;

        ret = __volume_proto_chunk_pre_read(volume_proto, chunk_io);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        table2 = &volume_proto->table2;
        ret = table2->pre_io(table2, &io->id, chkinfo, chkstat, vfm, &io->vclock.clock, __OP_READ);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io->vclock.vfm = vfm->clock;

        return 0;
err_ret:
        return ret;
}

int volume_proto_newchunk(volume_proto_t *volume_proto, const chkid_t *chkid, const buffer_t *buf)
{
        int ret, localize;
        table2_t *table2;
        io_opt_t io_opt;

        if (!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        localize = !!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_LOCALIZE__);
        table2 = &volume_proto->table2;
        io_opt_init(&io_opt, localize, 0, 0, 0);
        ret = table2->chunk_createwith(table2, chkid, &io_opt, volume_proto, buf);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int volume_proto_chunk_exist(volume_proto_t *volume_proto, const chkid_t *chkid, int *exist)
{
        int ret;
        table2_t *table2;

        table2 = &volume_proto->table2;
        ret = table2->chunk_exist(table2, chkid, exist, volume_proto);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int volume_proto_discard(volume_proto_t *volume_proto, const chkid_t *chkid)
{
        int ret;
        table2_t *table2;

#if 0
        if (!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }
#endif

        ANALYSIS_BEGIN(0);

        table2 = &volume_proto->table2;
        ret = table2->chunk_discard(table2, chkid, volume_proto);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_discard");

        return 0;
err_ret:
        return ret;
}

int volume_proto_batch_discard(volume_proto_t *volume_proto, const chkid_t **chkid, int n_chks)
{
        int ret;
        table2_t *table2;

#if 0
        if (!(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }
#endif

        ANALYSIS_BEGIN(0);

        table2 = &volume_proto->table2;
        ret = table2->chunk_batch_discard(table2, chkid, n_chks, volume_proto);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "volume_proto_discard");

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_chunk_reject(volume_proto_t *volume_proto, const chkid_t *chkid,
                                     const nid_t *bad, chkinfo_t *chkinfo)
{
        table1_t *table1;

        table1 = &volume_proto->table1;
        return table1->chunk_reject(table1, chkid, bad, chkinfo);
}

STATIC int __volume_proto_chunk_extend(volume_proto_t *volume_proto, const int idx)
{
        int ret, table_count, chknum;
        table1_t *table1;
        table2_t *table2;

        chknum = idx + 1;
        table_count = chknum / FILE_PROTO_EXTERN_ITEM_COUNT;
        table_count = (table_count == 0) ? 1 : table_count;

        table1 = &volume_proto->table1;
        ret = table1->extend(table1, table_count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        table2 = &volume_proto->table2;
        ret = table2->extend(table2, chknum, __OP_WRITE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static volume_proto_op_t __volume_proto_ops = {
        .load      = NULL,
#ifdef USE_ROW2
        .post_load = volume_proto_post_load,
#else
        .post_load = NULL,
#endif
        .destroy   = NULL,
};

STATIC int __volume_proto_vfm_cleanup(volume_proto_t *volume_proto, const chkid_t *tid)
{
        int ret;
        table1_t *table1 = &volume_proto->table1;
        table2_t *table2 = &volume_proto->table2;

        DBUG(CHKID_FORMAT" check\n", CHKID_ARG(tid));

        ret = table1->chunk_check(table1, tid, __OP_WRITE, NULL);
        if (unlikely(ret)) {
                DBUG("check "CHKID_FORMAT" ret %u %s\n", CHKID_ARG(tid), ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        ret = table2->vfm_cleanup(table2, tid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_vfm_set_dangerously(volume_proto_t *volume_proto, const chkid_t *chkid, const vfm_t *vfm)
{
        int ret;
        table1_t *table1 = &volume_proto->table1;
        table2_t *table2 = &volume_proto->table2;
        chkid_t tid;

        if (unlikely(chkid->type != __RAW_CHUNK__)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        cid2tid(&tid, chkid);

        ret = table1->chunk_check(table1, &tid, __OP_WRITE, NULL);
        if (unlikely(ret)) {
                DWARN("check "CHKID_FORMAT" ret %u %s\n", CHKID_ARG(&tid), ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        ret = table2->vfm_set_dangerously(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_vfm_get(volume_proto_t *volume_proto, const chkid_t *chkid, vfm_t *vfm)
{
        int ret;
        table1_t *table1 = &volume_proto->table1;
        table2_t *table2 = &volume_proto->table2;
        chkid_t tid;

        DBUG("vfm get "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        if (unlikely(chkid->type != __RAW_CHUNK__)) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        cid2tid(&tid, chkid);

        ret = table1->chunk_check(table1, &tid, __OP_WRITE, NULL);
        if (unlikely(ret)) {
                DWARN("check "CHKID_FORMAT" ret %u %s\n", CHKID_ARG(&tid), ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        ret = table2->vfm_get(table2, chkid, vfm);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_vfm_stat(volume_proto_t *volume_proto, int *count)
{
        int ret;
        table1_t *table1 = &volume_proto->table1;

        ret = table1->vfm_stat(table1, count);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_vfm_add(volume_proto_t *volume_proto, const nid_t *nid, int count)
{
        int ret, i;
        table1_t *table1 = &volume_proto->table1;
        table2_t *table2 = &volume_proto->table2;
        chkid_t tid, chkid;
        int table_count;

        table_count = table1->table_count;

        for (i = 0; i < table_count; i++) {
                tid = volume_proto->chkid;
                tid.idx = i;
                tid.type = __VOLUME_SUB_CHUNK__;

                chkid = tid;
                chkid.idx = tid.idx * FILE_PROTO_EXTERN_ITEM_COUNT;
                chkid.type = __RAW_CHUNK__;
                
                ret = table2->vfm_add(table2, &chkid, nid, count);
                if (unlikely(ret)) {
                        if (ret == EEXIST || ret == ENOENT)
                                continue;
                        else
                                GOTO(err_ret, ret);
                }
        }
        
        return 0;
err_ret:
        return ret;
}


STATIC int __volume_proto_create(volume_proto_t **_volume_proto, const chkid_t *chkid)
{
        int ret;
        volume_proto_t *volume_proto;

        ret = ymalloc((void **)&volume_proto, sizeof(*volume_proto));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(volume_proto, 0x0, sizeof(*volume_proto));

        count_list_init(&volume_proto->connect.connect_list);
        ret = plock_init(&volume_proto->connect.rwlock, "connect.plock");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        volume_proto->cleanup = __volume_proto_cleanup;
        volume_proto->suicide = __volume_proto_suicide;
        volume_proto->cleanup_bh1 = __volume_proto_cleanup_bh1;
        volume_proto->cleanup_bh2 = __volume_proto_cleanup_bh2;

        volume_proto->needreload = __volume_proto_needreload;
        volume_proto->chunk_getinfo = __volume_proto_chunk_getinfo;
        volume_proto->chunk_cleanup = __volume_proto_chunk_cleanup;
        volume_proto->chunk_check = __volume_proto_chunk_check;
        volume_proto->chunk_sync = __volume_proto_chunk_sync;
        volume_proto->chunk_set = __volume_proto_chunk_set;
        volume_proto->chunk_localize = __volume_proto_chunk_localize;
        volume_proto->chunk_allocate = __volume_proto_chunk_allocate;
        volume_proto->chunk_iterator1 = __volume_proto_chunk_iterator1;
        volume_proto->chunk_iterator2 = __volume_proto_chunk_iterator2;
        volume_proto->chunk_unintact1 = __volume_proto_chunk_unintact1;
        volume_proto->chunk_unintact2 = __volume_proto_chunk_unintact2;
        volume_proto->chunk_update = __volume_proto_chunk_update;
        volume_proto->chunk_reject = __volume_proto_chunk_reject;
        volume_proto->chunk_extend = __volume_proto_chunk_extend;

        volume_proto->move = __volume_proto_move;
        volume_proto->chunk_move = __volume_proto_chunk_move;

        volume_proto->getattr = __volume_proto_getattr;
        volume_proto->setattr = __volume_proto_setattr;
        volume_proto->stat = __volume_proto_stat;

        volume_proto->iscsiConnection = __volume_proto_iscsi_connection;
        volume_proto->iscsiConnect = __volume_proto_iscsi_connect;
        volume_proto->iscsiDisconnect = __volume_proto_iscsi_disconnect;

        volume_proto->xattr_set = __volume_proto_xattr_set;
        volume_proto->xattr_get = __volume_proto_xattr_get;
        volume_proto->xattr_list = __volume_proto_xattr_list;
        volume_proto->xattr_remove = __volume_proto_xattr_remove;

        volume_proto->vfm_cleanup = __volume_proto_vfm_cleanup;
        volume_proto->vfm_set_dangerously = __volume_proto_vfm_set_dangerously;
        volume_proto->vfm_get = __volume_proto_vfm_get;
        volume_proto->vfm_add = __volume_proto_vfm_add;
        volume_proto->vfm_stat = __volume_proto_vfm_stat;

        volume_proto->chkid = *chkid;
        volume_proto->ltime = 0;

        volume_proto->status = VOLUME_PROTO_STATUS_INIT;
        volume_proto->ops = __volume_proto_ops;
        memset(&volume_proto->chunk_ops, 0x0, sizeof(volume_proto->chunk_ops));

        ret = lease_create(&volume_proto->lease, chkid);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        recovery_qos_init(&volume_proto->qos);

        *_volume_proto = volume_proto;

        return 0;
err_ret:
        return ret;
}

static int __volume_proto_iops_bucket_load(volume_proto_t *volume_proto, token_bucket_t *bucket)
{
        int ret, buflen;
        char *buf;

        buflen = MAX_BUF_LEN;

        ret = ymalloc((void **)&buf, MAX_BUF_LEN);
        if (ret)
                GOTO(err_ret, ret);

        memset(buf, 0x0, MAX_BUF_LEN);

        ret = volume_proto->xattr_get(volume_proto, THROT_IOPS, buf, &buflen);
        if (ret) {
                if (ret == ENOKEY) {
                        goto out;
                } else {
                        GOTO(err_free, ret);
                }
        }

        ret = throt_qos_set(bucket, buf);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

out:
        yfree((void **)&buf);
        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

static int __volume_proto_bw_bucket_load(volume_proto_t *volume_proto, token_bucket_t *bw_bucket)
{
        int ret, buflen;
        char *buf;

        memset(bw_bucket, 0x0, sizeof(token_bucket_t));

        buflen = MAX_BUF_LEN;

        ret = ymalloc((void **)&buf, MAX_BUF_LEN);
        if (ret)
                GOTO(err_ret, ret);

        memset(buf, 0x0, MAX_BUF_LEN);

        ret = volume_proto->xattr_get(volume_proto, THROT_BW, buf, &buflen);
        if (ret) {
                if (ret == ENOKEY) {
                        goto out;
                } else {
                        GOTO(err_free, ret);
                }
        }

        ret = throt_bw_set(bw_bucket, buf);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

out:
        yfree((void **)&buf);
        return 0;
err_free:
        yfree((void **)&buf);
err_ret:
        return ret;
}

static int __volume_proto_latency_load(volume_proto_t *volume_proto, latency_t *latency)
{
        int ret, buflen, times = 0;
        char *buf;

        buflen = MAX_BUF_LEN;

        ret = ymalloc((void **)&buf, MAX_BUF_LEN);
        if (ret)
                GOTO(err_ret, ret);

        memset(buf, 0x0, MAX_BUF_LEN);

        ret = volume_proto->xattr_get(volume_proto, ANALYSIS_LATENCY, buf, &buflen);
        if (ret) {
                if (ret == ENOKEY || ret == ENOENT) {
                        times = 0;
                } else
                        GOTO(err_free, ret);
        } else {
                times = atoi(buf);
        }

        if (times > 0 && times < USEC_PER_SEC) {
                latency->valid = USEC_PER_SEC/times;
        } else if (times >= USEC_PER_SEC) {
                latency->valid = 1;
        } else {
                latency->valid = 0;
        }

        yfree((void **)&buf);
        return 0;
err_free:
        latency->valid = 0;
        yfree((void **)&buf);
err_ret:
        return ret;
}

#if ECLOG_ENABLE
static int __volume_proto_eclog_load(volume_proto_t *volume_proto, ec_t *ec)
{
        int ret, count;

        if (!EC_ISEC(ec) && !EC_ISLOG(ec))
                goto out;

        count = ECLOG_LOG_MAX(ec);

        ret = ymalloc((void **)&volume_proto->eclog, count * sizeof(eclog_t *));
        if (ret)
                GOTO(err_ret, ret);

        memset(volume_proto->eclog, 0x0, count * sizeof(eclog_t *));

        ret = eclog_load(volume_proto);
        if (unlikely(ret))
                GOTO(err_free, ret);

out:
        return 0;
err_free:
        yfree((void **)&volume_proto->eclog);
err_ret:
        return ret;
}
#endif

int volume_proto_renew(volume_proto_t *volume_proto)
{
        int ret;

        ret = lease_set(&volume_proto->lease);
        if (ret)
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

void volume_proto_load_lsv_callback(void *arg)
{
        volume_proto_t *volume_proto = (volume_proto_t *)arg;

        int ret = __try_load_lsv(volume_proto);
        if (unlikely(ret)) {
                DFATAL("loading lsv volume failed, ret=%d\r\n", ret);
        }
}

int volume_proto_load(volume_proto_t **_volume_proto, const char *pool,
                      const chkid_t *parent, const chkinfo_t *chkinfo)
{
        int ret;
        volume_proto_t *volume_proto;
        const chkid_t *chkid;
        table_proto_t *table_proto;
        table1_t *table1;

        if (!net_islocal(&chkinfo->diskid[0].id)) {
                ret = EREMCHG;
                GOTO(err_ret, ret);
        }

        ANALYSIS_BEGIN(0);

        DINFO("load "CHKID_FORMAT" begin\n", CHKID_ARG(&chkinfo->id));

        chkid = &chkinfo->id;
        YASSERT(chkid->type == __VOLUME_CHUNK__);
        YASSERT(CHKINFO_SIZE(LICH_REPLICA_MAX) <= FILE_PROTO_EXTERN_ITEM_SIZE);

        ret = __volume_proto_create(&volume_proto, chkid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 需要放在开始处
        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret)) {
#if 0
                /* the last lease not timeout,just try again, do not need md_map_drop & locator_rpc_lookup */
                if (ret == EREMCHG)
                        ret = EAGAIN;
#endif
                GOTO(err_free, ret);
        }

        ret = volume_proto_analysis_init(volume_proto);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        ret = volume_proto_latency_init(volume_proto);
        if (ret)
                GOTO(err_lease, ret);

        ret = volume_proto_snapshot_init(volume_proto);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        ret = table1_init(&volume_proto->table1, pool, chkid, parent, volume_proto);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        volume_proto->table1.volume_proto = volume_proto;

        ret = table2_init(&volume_proto->table2, &volume_proto->table1);
        if (unlikely(ret)) {
                GOTO(err_lease, ret);
        }

        volume_proto->table2.volume_proto = volume_proto;

        ret = __volume_proto_connect(volume_proto, parent, chkid);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        // TODO if volume is too large, load L1 is too slow
        ret = table1_load(&volume_proto->table1, &volume_proto->table1.parent, chkinfo);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        table1 = &volume_proto->table1;
        volume_proto->chunk_ops = *chunk_proto_ops_get(&table1->fileinfo.ec);

        ret = table1->loadattr(table1, volume_proto);
        if (unlikely(ret))
                GOTO(err_lease, ret);

        volume_proto->status = VOLUME_PROTO_STATUS_PRE_LOAD;

        // -- bottom half
        {
                // TODO load L2, delay or concurrency
                table_proto = table1->table_proto;
                ret = table_proto->iterator(table_proto, table1_iterator_callback, -1, volume_proto);
                if (unlikely(ret)) {
                        GOTO(err_lease, ret);
                }

                int i;
                for (i=0; i < table1->ext_info.chunk_count; ++i) {
                        table_proto = table1->ext[i];
                        if (!table_proto)
                                continue;

                        ret = table_proto->iterator(table_proto, table1_iterator_callback, -1, volume_proto);
                        if (unlikely(ret)) {
                                GOTO(err_lease, ret);
                        }
                }
        }

        if (is_volume_clone(&table1->fileinfo)) {
                // TODO load source
                int valuelen = MAX_NAME_LEN;
                ret = table1->xattr_get(table1, LICH_SYSTEM_ATTR_SOURCE, table1->source, &valuelen);
                if (unlikely(ret)) {
                        if (ret == ENOKEY) {
                                DWARN("volume "CHKID_FORMAT" source not found\n", CHKID_ARG(&volume_proto->chkid));
                        }
                }
        }

        // for QoS
        ret = __volume_proto_iops_bucket_load(volume_proto, &volume_proto->iops_bucket);
        if (ret)
                GOTO(err_lease, ret);

        ret = __volume_proto_bw_bucket_load(volume_proto, &volume_proto->bw_bucket);
        if (ret)
                GOTO(err_lease, ret);

        ret = __volume_proto_latency_load(volume_proto, &volume_proto->latency);
        if (ret)
                GOTO(err_lease, ret);

        // 申请lease，保证卷控制器的唯一性
        // 后续lease保护的资源上的所有操作，都要先检查lease的有效性
#if RAMDISK_ENABLE
        ret = ramdisk_open(&volume_proto->ramdisk_fd, chkid);
        if (unlikely(ret))
                GOTO(err_lease, ret);
#endif

        ret = __volume_proto_connect(volume_proto, parent, chkid);
        if (unlikely(ret))
                GOTO(err_close, ret);

        {
                table_proto = table1->table_proto;

                CHKINFO_DUMP(table_proto->chkinfo, D_BUG);

                char _chkinfo[CHKINFO_MAX];
                chkinfo_t *tmp = (void *)_chkinfo;
                ret = md_chunk_getinfo1(volume_proto->table1.pool,
                                       &volume_proto->table1.parent, &volume_proto->chkid, tmp, NULL);
                if (ret)
                        GOTO(err_close, ret);

                /* maybe chkinfo changed by an other task when volume_prot reloading, so we just retry */
                //YASSERT(memcmp(table_proto->chkinfo, tmp, sizeof(*tmp)) == 0);
                if (memcmp(table_proto->chkinfo, tmp, sizeof(*tmp))) {
                        ret = EAGAIN;
                        GOTO(err_close, ret);
                }
        }

        if(volume_proto->status < VOLUME_PROTO_STATUS_LOAD1)
                volume_proto->status = VOLUME_PROTO_STATUS_LOAD1;

        volume_proto->uptime = gettime();

#if ECLOG_ENABLE
        ret = __volume_proto_eclog_load(volume_proto, &table1->fileinfo.ec);
        if (ret)
                GOTO(err_close, ret);
#endif

        *_volume_proto = volume_proto;

        ANALYSIS_END(0, IO_WARN, id2str(&chkinfo->id));

        ret = lease_set(&volume_proto->lease);
        if (unlikely(ret))
                GOTO(err_close, ret);

        ret = trans_redo(volume_proto);
        if (unlikely(ret))
                GOTO(err_close, ret);

#ifdef USE_ROW2
//
        //must be after succesful load volume then post load lsv.
        //schedule_task_new("__try_load_lsv", volume_proto_load_lsv_callback, volume_proto, -1);
        ret = __try_load_lsv(volume_proto);
        if (unlikely(ret)) {
                GOTO(err_close, ret);
        }
#endif
        DINFO("load "CHKID_FORMAT" success, %p ltime %u, source (%s)\n", CHKID_ARG(&chkinfo->id),
              volume_proto, volume_proto->ltime, table1->source);

        return 0;
err_close:
#if RAMDISK_ENABLE
        ramdisk_close(volume_proto->ramdisk_fd, chkid);
#endif
err_lease:
        lease_free(&volume_proto->lease);
err_free:
        volume_proto_destroy(volume_proto);
err_ret:
        DBUG("load "CHKID_FORMAT" fail, ret %d\n", CHKID_ARG(&chkinfo->id), ret);
        SWARN(0, "%s load "CHKID_FORMAT" fail\n", M_DATA_VOLUME_WARN,
              CHKID_ARG(&chkinfo->id));
        return ret;
}

#ifdef USE_ROW2
static int __try_load_lsv(volume_proto_t *volume_proto) {
        int ret;
        table1_t *table1 = &volume_proto->table1;
        volume_format_t format = lich_volume_format(&table1->fileinfo);

        DINFO("try load %s format %d\n", id2str(&table1->chkid), format);

        if (IS_RICH_LSV(format)) {
                volume_proto_snapshot_lsv_init(volume_proto);
                if (volume_proto->status == VOLUME_PROTO_STATUS_LOAD1) {
                        ANALYSIS_BEGIN(0);
                        volume_proto->status = VOLUME_PROTO_STATUS_LOAD2;       //multi-load protection.
                        ret = volume_proto_post_load(volume_proto);
                        ANALYSIS_END(0, IO_WARN, id2str(&table1->chkid));
                        if (unlikely(ret)) {
                                volume_proto->status = VOLUME_PROTO_STATUS_LOAD1;
                                GOTO(err_ret, ret);
                        }
                }
        }

        DBUG("load %s format %d status %d successfully\n",
              id2str(&table1->chkid),
              format,
              volume_proto->status);

        return 0;
err_ret:
        return ret;
}

int IO_FUNC volume_proto_chunk_write(volume_proto_t *volume_proto, io_opt_t *io_opt,
                             chunk_io_t *chunk_io)
{
        int ret;
        table2_t *table2 = &volume_proto->table2;

        ANALYSIS_BEGIN(1);

        // 从chkstat.chkstat_clock获取clock，且递增chkstat.chkstat_clock
        ret = __volume_proto_chunk_pre_write(volume_proto, io_opt, chunk_io);
        if (unlikely(ret)) {
                DBUG("chunk "CHKID_FORMAT" %s\n",
                     CHKID_ARG(&chunk_io->io.id), strerror(ret));
                GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(1, IO_WARN, "__volume_proto_chunk_write_01");

        table2 = &volume_proto->table2;
        ret = table2->chunk_write(table2, chunk_io);
        if (unlikely(ret)) {
                volume_proto_chunk_reset(volume_proto, &chunk_io->io.id);
                GOTO(err_ret, ret);
        }
        
        ret = table2->chunk_check(table2, &chunk_io->io.id, __OP_WRITE, 0, NULL, NULL);
        if (unlikely(ret)) {
                DBUG(CHKID_FORMAT" postio check fail ret:%d\n",
                     CHKID_ARG(&chunk_io->io.id), ret);
                YASSERT(ret != ENOENT);
                GOTO(err_ret, ret);
        }

#if ISCSI_IO_RECORD
        char tmp[MAX_INFO_LEN];

        sprintf(tmp, "volume_proto_chunk_write "CHKID_FORMAT" (%llu, %llu)",
                        CHKID_ARG(&chunk_io->io.id),
                        (LLU)chunk_io->io.offset, (LLU)chunk_io->io.size);

        mbuffer_dump(&chunk_io->buf, 8, tmp);
#endif

        return 0;
err_ret:
        return ret;
}

int IO_FUNC volume_proto_chunk_read(volume_proto_t *volume_proto, chunk_io_t *chunk_io)
{
        int ret, retry;
        table2_t *table2 = &volume_proto->table2;


        (void) retry;
        retry = 0;
retry1:
        ret = __volume_proto_chunk_pre_read(volume_proto, chunk_io);
        if (unlikely(ret)) {
#if 0
                DBUG("chunk "CHKID_FORMAT" %s\n",
                     CHKID_ARG(&chunk_io->io.id), strerror(ret));
                GOTO(err_ret, ret);
#else
                if (ret == ENOENT) {
                        if (!(chunk_io->io.flags & __FILE_ATTR_NOFILL__)) {
                                YASSERT(retry == 0);
                        
                                DBUG(CHKID_FORMAT" read zero, offset %ju size %u\n",
                                     CHKID_ARG(&chunk_io->io.id), chunk_io->io.offset, chunk_io->io.size);
                                ret = table2->chunk_readzero(table2, chunk_io);
                                if (unlikely(ret)) {
                                        if (ret == EEXIST) {
                                                retry = 1;
                                                goto retry1;
                                        } else
                                                GOTO(err_ret, ret);
                                }

                                goto out;
                        } else {
                                GOTO(err_ret, ret);
                        }
                } else {
                        DBUG("chunk "CHKID_FORMAT" %s\n",
                             CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
#endif
        }

        retry = 0;
retry2:
        ret = table2->chunk_read(table2, chunk_io);
        if (unlikely(ret)) {
#if 0
                if (ret != ENOENT) {
                        volume_proto_chunk_reset(volume_proto, &chunk_io->io.id);
                }

                GOTO(err_ret, ret);
#else
                if (ret == ENOENT) {
                        if (!(chunk_io->io.flags & __FILE_ATTR_NOFILL__)) {
                                YASSERT(retry == 0);

                                DINFO(CHKID_FORMAT" read zero, offset %ju size %u\n",
                                     CHKID_ARG(&chunk_io->io.id), chunk_io->io.offset, chunk_io->io.size);
                                ret = table2->chunk_readzero(table2, chunk_io);
                                if (unlikely(ret)) {
                                        if (ret == EEXIST) {
                                                retry = 1;
                                                goto retry2;
                                        } else
                                                GOTO(err_ret, ret);
                                }

                                goto out;
                        } else {
                                GOTO(err_ret, ret);
                        }
                } else {
                        volume_proto_chunk_reset(volume_proto, &chunk_io->io.id);
                        GOTO(err_ret, ret);
                }
#endif
        }

#if ISCSI_IO_RECORD
        char tmp[MAX_INFO_LEN];

out:
        sprintf(tmp, "volume_proto_chunk_read "CHKID_FORMAT" (%llu, %llu)",
                        CHKID_ARG(&chunk_io->io.id),
                        (LLU)chunk_io->io.offset, (LLU)chunk_io->io.size);

        mbuffer_dump(&chunk_io->buf, 8, tmp);
#else
out:
#endif

        return 0;
err_ret:
        return ret;
}

int volume_proto_post_load(volume_proto_t *volume_proto)
{
        int ret;
        table1_t *table1;
        lsv_volume_proto_t *lsv_info;

        table1 = &volume_proto->table1;
        lsv_info = &table1->lsv_info;

        volume_format_t format = lich_volume_format(&table1->fileinfo);

        memset(lsv_info, 0, sizeof(lsv_volume_proto_t));

        lsv_info->volume_proto = volume_proto;
        lsv_info->ino = table1->chkid.id;
        lsv_info->volume_format = format;
        lsv_info->max_size = VOLUME_SIZE_MAX;
        lsv_info->size = table1->fileinfo.logical_size;

        lsv_info->u.volume_page_id = table1->fileinfo.volume_page_id;
        lsv_info->u.gc_os_page_id = table1->fileinfo.gc_os_page_id;
        lsv_info->u.gc_bitmap_page_id = table1->fileinfo.gc_bitmap_page_id;
        lsv_info->u.rcache_page_id = table1->fileinfo.rcache_page_id;
        lsv_info->u.wbuf_page_id = table1->fileinfo.wbuf_page_id;
        lsv_info->u.bitmap_chunk_id = table1->fileinfo.bitmap_chunk_id;

        lsv_rwlock_init(&lsv_info->loading_lock);

        plock_wrlock(&lsv_info->loading_lock);

        // 读取上次回话保存的值，判断本次启动需要load，还是recover
        // 在卷加载成功后，马上设置其值=0（异常退出）
        // 在退出的时候，设置其值=1（正常退出）
        lsv_info->u.system_power_on = table1->fileinfo.system_power_off;
        lsv_info->u.safe_mode = 1;

        if (!is_lsv_inited(&table1->fileinfo)) {
                lsv_info->u.start_mode = LSV_SYS_CREATE;

                // TODO load not ready, deadlock
                DWARN("format!ino %ju format %d power %u\n", lsv_info->ino, format, lsv_info->u.system_power_on);
                if (format == VOLUME_FORMAT_ROW2) {
                        DINFO("creating ROW2 volume...\r\n");
                        ret = row2_info_format(lsv_info);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (format == VOLUME_FORMAT_ROW3) {
                        DINFO("creating ROW3 volume...\r\n");
                        ret = row3_info_format(lsv_info);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (format == VOLUME_FORMAT_LSV){
                        DINFO("creating LSV volume...\r\n");
                        ret = lsv_info_format(lsv_info);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        DINFO("creating OMG volume...\r\n");
                        YASSERT(0);
                }

                // TODO update fileinfo
                table1->fileinfo.bitmap_chunk_id = lsv_info->u.bitmap_chunk_id;
                table1->fileinfo.attr |= __FILE_ATTR_INITED__;
        } else {
                if (lsv_info->u.system_power_on == 1) {
                        lsv_info->u.start_mode = LSV_SYS_LOAD;
                } else if(lsv_info->u.system_power_on == 0) {
                        lsv_info->u.start_mode = LSV_SYS_RECOVERY;
                } else {
                        YASSERT(0);
                }

                // TODO read
                DWARN("init!ino %ju format %d power %u\n", lsv_info->ino, format, lsv_info->u.system_power_on);
                if (format == VOLUME_FORMAT_ROW2) {
                        DINFO("loading ROW2 volume...\r\n");
                        ret = row2_info_init(lsv_info);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (format == VOLUME_FORMAT_ROW3) {
                        DINFO("loading ROW3 volume...\r\n");
                        ret = row3_info_init(lsv_info);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (format == VOLUME_FORMAT_LSV){
                        DINFO("loading LSV volume...\r\n");
                        ret = lsv_info_init(lsv_info);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        DINFO("loading OMG volume...\r\n");
                        YASSERT(0);
                }
        }

        // default value is 0
        table1->fileinfo.system_power_off = 0;
        table1->table_proto->setinfo(table1->table_proto, &table1->fileinfo,
                                     sizeof(fileinfo_t), TABLE_PROTO_INFO_ATTR);
        DWARN("set power=0\n");

        DINFO("ino %ju size %ju %ju page_id <volume %u gc %u %u rcache %u wbuf %u> bitmap_chunk_id %u start_mode %u safe %u\n",
              lsv_info->ino,
              lsv_info->max_size,
              lsv_info->size,
              lsv_info->u.volume_page_id,
              lsv_info->u.gc_os_page_id,
              lsv_info->u.gc_bitmap_page_id,
              lsv_info->u.rcache_page_id,
              lsv_info->u.wbuf_page_id,
              lsv_info->u.bitmap_chunk_id,
              lsv_info->u.start_mode,
              lsv_info->u.safe_mode);

        volume_proto->status = VOLUME_PROTO_STATUS_LOAD2;
        return 0;
err_ret:
        return ret;
}

int volume_proto_post_destroy(volume_proto_t *volume_proto)
{
        int ret = 0;
        table1_t *table1;
        lsv_volume_proto_t *lsv_info;

        table1 = &volume_proto->table1;
        if (table1 == NULL)
                return 0;

        lsv_info = &table1->lsv_info;
        if (lsv_info == NULL)
                return 0;

        volume_format_t format = lich_volume_format(&table1->fileinfo);

        //RAW vol, shoudle not be run here
        if (format == VOLUME_FORMAT_RAW) {
                return 0;
        }

        //plock_wrlock(&lsv_info->loading_lock);  //waiting for async task finish.

        if (format == VOLUME_FORMAT_ROW2) {
                DINFO("deleting ROW2 volume...\r\n");
                ret = row2_info_destroy(lsv_info, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (format == VOLUME_FORMAT_ROW3) {
                DINFO("deleting ROW3 volume...\r\n");
                ret = row3_info_destroy(lsv_info, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (format == VOLUME_FORMAT_LSV) {
                DINFO("deleting LSV volume...\r\n");
                ret = lsv_info_destroy(lsv_info);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                DINFO("deleting OMG volume...\r\n");
                YASSERT(0);
        }

err_ret:
        return ret;
}

/**
 *
 * @param volume_proto
 * @return 0 is ready
 * @return < 0 errno
 */
int volume_proto_check_ready(volume_proto_t *volume_proto) {
        int ret = 0;

        if (volume_proto == NULL) {
                ret = ENOENT;
                goto err_ret;
        }

        table1_t *table1 = &volume_proto->table1;
        volume_format_t format = lich_volume_format(&table1->fileinfo);

        // bitmap是异步加载，完成时设置状态位
        if (unlikely(IS_RICH_LSV(format))) {
                if (volume_proto->status != VOLUME_PROTO_STATUS_LOAD2) {
                        ret = EBUSY;
                        goto err_ret;
                }

                ret = lsv_info_check_ready(&table1->lsv_info);
                if (unlikely(ret)) {
                        goto err_ret;
                }
        } else {
                if (volume_proto->status != VOLUME_PROTO_STATUS_LOAD1) {
                        ret = EBUSY;
                        goto err_ret;
                }
        }

        DINFO("ino %ju status %u format %d\n", volume_proto->chkid.id, volume_proto->status, format);
        return 0;
err_ret:
        DWARN("ino %ju status %u format %d ret %d\n", volume_proto->chkid.id, volume_proto->status, format, ret);
        return ret;
}

int volume_proto_info(volume_proto_t *volume_proto) {
        table1_t *table1 = &volume_proto->table1;
        table2_t *table2 = &volume_proto->table2;
        fileinfo_t *fileinfo = &table1->fileinfo;
        volume_format_t format = lich_volume_format(&table1->fileinfo);
        lsv_volume_proto_t *lsv_info = &table1->lsv_info;

        lsv_volgc_info_t *volume_info = lsv_info->volgc_info;
        uint32_t tail = 0;
        if (volume_info) {
                tail = volume_info->tail;
        }

        DBUG("ino %ju/%ju %u %u lvl %u %u %u size %ju tail %u stat %ju/%ju %ju/%ju data %ju %ju %ju %ju meta %ju %ju %ju head %ju %ju %ju cache %ju %ju\n",
              table1->parent.id,
              table1->chkid.id,
              volume_proto->status,
              format,
              1 + table1->ext_info.chunk_count,
              table1->table_count,
              table2->chknum,
              fileinfo->size,
              tail,
              lsv_info->row2_stat.write,
              lsv_info->row2_stat.align_write,
              lsv_info->row2_stat.read,
              lsv_info->row2_stat.align_read,
              lsv_info->row2_stat.data_malloc,
              lsv_info->row2_stat.data_write,
              lsv_info->row2_stat.data_read,
              lsv_info->row2_stat.data_remote_read,
              lsv_info->row2_stat.meta_malloc,
              lsv_info->row2_stat.meta_write,
              lsv_info->row2_stat.meta_read,
              lsv_info->row2_stat.head_malloc,
              lsv_info->row2_stat.head_write,
              lsv_info->row2_stat.head_read,
              lsv_info->row2_stat.cache_swapout,
              lsv_info->row2_stat.cache_swapin);

        return 0;
}
#endif
